-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Lazy parameters adaptation (part 1 - ZSTD_c_stableInBuffer) #2974
Conversation
lib/compress/zstd_compress.c
Outdated
@@ -6156,17 +6163,23 @@ size_t ZSTD_compressSequences(ZSTD_CCtx* const cctx, void* dst, size_t dstCapaci | |||
/*====== Finalize ======*/ | |||
|
|||
/*! ZSTD_flushStream() : | |||
* Note : not compatible with ZSTD_c_stableInBuffer |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this comment accurate?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, that's a leftover, to be fixed
lib/compress/zstd_compress.c
Outdated
size_t const inputSize = input->size - input->pos; /* no obligation to start from pos==0 */ | ||
if ( (cctx->requestedParams.inBufferMode == ZSTD_bm_stable) /* input is presumed stable, across invocations */ | ||
&& (endOp == ZSTD_e_continue) /* no flush requested, more input to come */ | ||
&& (inputSize < ZSTD_BLOCKSIZE_MAX) ) { /* not even reached one block yet */ | ||
/* just wait, allows lazy adaptation of compression parameters */ | ||
cctx->expectedInBuffer = *input; | ||
return ZSTD_FRAMEHEADERSIZE_MIN(cctx->requestedParams.format); /* at least some header to produce */ | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that this is correct. I get that the use case of streaming + stable input is somewhat contrived, and that it isn't super useful in practice. But...
Zstd guarantees to make forward progress. Either consume some input or produce some output. This breaks that contract, since input
is not modified.
A loop constructed like this should not infinite loop:
while (!(input empty || output empty || finished))
ZSTD_compressStream(cctx, &output, &input);
Instead, we should save the initial input->pos
, and update input->pos = input->size
.
Additionally, we shouldn't directly set cctx->expectedInBuffer
, ZSTD_setBufferExpectations(cctx, output, input)
should be used instead. The call on line 5656 can probably be moved up to handle both lazy & non-lazy adaptation, but only for the first call of lazy adaptation.
Lastly, since we are now modifying the input, we should make sure that we are calling FORWARD_IF_ERROR(ZSTD_checkBufferStability(cctx, output, input, endOp), "invalid buffers");
in the lazy-adaptation mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Zstd guarantees to make forward progress.
Forward progress was indeed one of my concerns when writing that code.
Some nuances here : there are limits to forward progress "guarantees" that zstd
provides, even with current code.
As a perhaps obvious example, if a user continuously presents an empty input, then there is nothing to do. Same thing when a user continuously presents an output buffer with no more available space to flush into, then the streaming interface cannot progress. In both cases, repetitively calling ZSTD_compressStream()
will just loop forever.
But it's okay, I guess we can classify above scenarios as user errors, that users should be able to debug, while the scenario you advocate for is different. What you mean is that, if it is possible to make some forward progresses, aka there is room in the output buffer and there is some input to ingest, then zstd
should make some forward progress.
In this instance, instead of returning immediately without doing anything, zstd
should fake progress by pretending having consumed the input, even though it hasn't, so that the suggested loop condition, which relies on forward progress, can exit.
I think that's fair, so that's what latest commit in this PR provides.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we shouldn't directly set
cctx->expectedInBuffer
,ZSTD_setBufferExpectations(cctx, output, input)
should be used instead.
ZSTD_setBufferExpectations()
relies on cctx->appliedParams
which is not initialized yet since at this stage ZSTD_CCtx_init_compressStream2()
has not been invoked yet.
I think we probably want to hold this until after the release, yeah? |
It's not critical for the release. |
ping, this should be ready for review and merge now |
ping 2 |
lib/compress/zstd_compress.c
Outdated
if (cctx->savedInPosPlusOne == 0) cctx->savedInPosPlusOne = input->pos + 1; | ||
cctx->expectedInBuffer = *input; | ||
/* pretend input was consumed, to give a sense forward progress */ | ||
input[0].pos = input[0].size; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not input->pos = input->size
? It seems strange to access it as an array.
OK, so having investigated the issue in more details, it's more complex than initially anticipated. And the necessity to "pretend" consuming all input data (even though it's not) doesn't help. The scenario suggested by @terrelln does indeed fail, as predicted. This scenario was part of the test suite, but only the error codes were checked. A full roundtrip comparison exposes the missing bytes. This one use case can be fixed. But then, other cases appear, and it becomes more complex. What about :
After the Now, this can be solved too. It's just that the topic goes beyond "late adaptation of compression parameters", it becomes "how to allow So I'll have to spend a bit of time to figure out how to do that without impacting too much the library. And if not, then maybe just drop it. This was more a "late parameter adaptation demo" anyway. |
25794fa
to
df78010
Compare
Good news, the new version shipped this morning seems to fix the issues reported above. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR looks good to me, just need to change some asserts to RETURN_ERROR_IF()
, and a few more tests.
The approach seems right, and simplifies the usage of ZSTD_c_stableInBuffer
.
DEBUGLOG(5, "ZSTD_compressStream_generic, flush=%i, srcSize = %zu", (int)flushMode, input->size - input->pos); | ||
assert(zcs != NULL); | ||
if (zcs->appliedParams.inBufferMode == ZSTD_bm_stable) { | ||
assert(input->pos >= zcs->stableIn_notConsumed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a RETURN_ERROR_IF()
right? Since we're validating user input, not library assumptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function ZSTD_compressStream_generic()
is invoked after ZSTD_checkBufferStability()
,
which should have already validated that ZSTD_c_stableInBuffer
condition is correctly respected,
aka input->pos
is stable and must not have been directly manipulated by the user.
So if we reach that assert()
condition, it looks to me that this would be a programmatic error, not (just) user input.
@@ -5645,8 +5669,27 @@ size_t ZSTD_compressStream2( ZSTD_CCtx* cctx, | |||
|
|||
/* transparent initialization stage */ | |||
if (cctx->streamStage == zcss_init) { | |||
FORWARD_IF_ERROR(ZSTD_CCtx_init_compressStream2(cctx, endOp, input->size), "CompressStream2 initialization failed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you point out in the comment inputSize = input->size - input->pos
. So this is actually a bug, and we'll get the pledgedSrcSize
wrong.
Can you add a test case for the scenario where input->pos
starts off as non-zero? It should fail before the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We won't get pledgedSrcSize
wrong because it's based on totalInputSize
, which correctly counts from initial pos
, wether it is 0 or non-0, by summing the inputSize
across invocations.
Point taken for the input->pos != 0
start condition test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm talking about the previous version of the code. It is buggy before your patch, and correct after.
You fixed the bug, so I would love to see the test case, so it doesn't accidentally get re-introduced.
if (cctx->stableIn_notConsumed) { | ||
assert(cctx->appliedParams.inBufferMode == ZSTD_bm_stable); | ||
/* some early data was skipped - make it available for consumption */ | ||
assert(input->pos >= cctx->stableIn_notConsumed); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should also be a RETURN_ERROR_IF()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same idea, ZSTD_checkBufferStability()
is invoked just before this paragraph, and should already take care of user input issues. So if we nonetheless hit this assert()
at this position, seems like it's a programmatic error.
lib/compress/zstd_compress.c
Outdated
assert(input->src == cctx->expectedInBuffer.src); | ||
assert(input->pos == cctx->expectedInBuffer.size); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should be RETURN_ERROR_IF()
since we're validating user input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, in this case, these checks are front line, they are effectively validating user input. They should return errors.
ZSTD_inBuffer const nullInput = { NULL, 0, 0 }; | ||
int const stableInput = (zcs->appliedParams.inBufferMode == ZSTD_bm_stable); | ||
ZSTD_inBuffer input = stableInput ? zcs->expectedInBuffer : nullInput; | ||
input.size = input.pos; /* do not ingest more input during flush */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be an assert(input.size == input.pos)
instead?
I believe that we should always have consumed the entire input, is that correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even with ZSTD_c_stableInBuffer
, it's possible to meet a situation where some input has not been consumed.
This happens if output buffer size is not large enough, in which case, the streaming state machine will be stopped during a flush stage, with some input not yet consumed.
After that ZSTD_flushStream()
does not present inBuffer
anymore, so it's not possible to update inBuffer->pos
. ZSTD_flushStream()
can only flush what's left in the output buffer, not ingest a bit more.
This situation is a bit different from ZSTD_compressStream2(cctx, outbuf, inbuf, ZSTD_e_flush)
, which allows ingesting a bit more input as part of the flush operation.
lib/compress/zstd_compress.c
Outdated
ZSTD_inBuffer const nullInput = { NULL, 0, 0 }; | ||
int const stableInput = (zcs->appliedParams.inBufferMode == ZSTD_bm_stable); | ||
ZSTD_inBuffer input = stableInput ? zcs->expectedInBuffer : nullInput; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can you move this into a small helper function that is shared between ZSTD_flushStream()
and ZSTD_endStream()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
tests/zstreamtest.c
Outdated
CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &inBuf) ); | ||
inBuf.size = 200; | ||
CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &inBuf) ); | ||
CHECK_Z( ZSTD_flushStream(cctx, &outBuf) ); | ||
inBuf.size = inputSize; | ||
CHECK_Z( ZSTD_compressStream(cctx, &outBuf, &inBuf) ); | ||
CHECK(ZSTD_endStream(cctx, &outBuf) != 0, "compression should be successful and fully flushed"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Can you also test ZSTD_compressStream2()
, since it isn't going through the exact same code-path as ZSTD_{compress,flush,end}Stream()
is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
on streaming compression implementation.
effectively makes ZSTD_c_stableInput compatible ZSTD_compressStream() and zstd_e_continue operation mode.
including flushStream(). Now the only condition is for `input.size` to continuously grow.
specifically, there is no obligation to start streaming compression with pos=0. stableSrc mode is now compatible with this setup.
as suggested by @terrelln. Also : commented zstreamtest more to ensure ZSTD_stableInBuffer is tested/
df78010
to
af3d9c5
Compare
had to create a new error code for this condition, none of the existing ones were fitting enough.
I believe final commit answers all remaining issues. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just one minor nit about consistency of the new error code
@@ -38,6 +38,7 @@ const char* ERR_getErrorString(ERR_enum code) | |||
case PREFIX(tableLog_tooLarge): return "tableLog requires too much memory : unsupported"; | |||
case PREFIX(maxSymbolValue_tooLarge): return "Unsupported max Symbol Value : too large"; | |||
case PREFIX(maxSymbolValue_tooSmall): return "Specified maxSymbolValue is too small"; | |||
case PREFIX(stabilityCondition_notRespected): return "pledged buffer stability condition is not respected"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ZSTD_checkBufferStability()
is currently using srcBuffer_wrong
and dstBuffer_wrong
. If you're going to add this code, can you please switch them over too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good idea
68d2d89
to
f2d9652
Compare
This is merely a way to "prove" that lazy parameters adaptation can work,
though this implementation only makes it happen when
ZSTD_c_stableInBuffer
is enabled.ZSTD_c_stableInBuffer
pushes the responsibility of buffering to the user,making lazy parameters adaptation simpler to achieve.
As a (favorable) side effect, it extends the nb of use cases supported by
ZSTD_c_stableInBuffer
.Specifically, this parameter is now compatible with "appending" scenarios,
where more data is continuously added after compression has started,
instead of being restricted to presenting the entire input immediately at start.
It also makes
ZSTD_c_stableInBuffer
compatible with the regular streaming interfaceaka
initCStream
,compressStream
,flushStream
andendStream
(was previously limited to
ZSTD_compressStream2( , , , zstd_e_end)
only).